Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Serialize DataFrame/Series using IPC in serde #20266

Merged
merged 11 commits into from
Dec 17, 2024

Conversation

nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Dec 12, 2024

We currently use custom serialization code, this PR streamlines the implementation to instead use IPC, which has support for more types, and can also be faster in some cases.

I've also swapped the serialization from ciborium to bincode, mainly due to an issue deserializing large byte data (enarx/ciborium#96), but bincode also appears to do better in benchmarks and has more downloads.

serialize(binary) benchmarks:

column size_before size_after %size time_before(s) time_after(s) runtime speedup
pl.select(pl.int_range(0, 30_000_000, dtype=pl.UInt64)) 150.0MB 0.61MB 0.4% 1.143455 0.623622 1.8x
pl.select(pl.repeat(1, 30_000_000, dtype=pl.UInt64)) 30.0MB 0.00054MB 0.0018% 1.087035 0.087299 12.0x
pl.read_csv('.env/data/yellow_tripdata_2015-01_head1m.csv') 110.0MB 25.0MB 23.0% 0.811681 0.785048 1.0x
pl.read_csv('.env/iris/iris.csv') 6.3KB 1.7KB 27.0% 0.001379 0.001247 1.1x

Note:

  • Time measurements are sum of both serialization and deserialization time

Note that there may be some degradations for the serialize(format="json") - this tradeoff is acceptable since it's deprecated functionality.

Exploratory benchmark testing
  • File: yellow_tripdata_2015-01_head1M.csv (1M rows)
  • Sorted by sum of serialization and deserialization time
  • Performed on EC2 c7i.4xlarge
  • Note: The compression here is applied only at the final serialization stage, the latest approach is to compress both during IPC writing and the final output data.
row_nr COLUMN_SERIALIZER SERIALIZER COMPRESSION serialized_size serialize_time deserialize_time
1 LEGACY BINCODE NONE 210.0MB 0.135096 0.364147
2 IPC BINCODE NONE 214.01MB 0.399154 0.222068
3 (proposed (binary)) IPC BINCODE ZSTD 43.46MB 0.64739 0.372755
4 (current (json)) LEGACY SERDE_JSON NONE 162.91MB 0.556199 0.694452
5 (current (binary)) LEGACY CIBORIUM NONE 108.73MB 0.234633 1.309476
6 (proposed (json)) IPC SERDE_JSON NONE 556.59MB 1.141087 2.623377
7 LEGACY CIBORIUM ZSTD 33.2MB 6.108194 2.157196
8 LEGACY BINCODE ZSTD 38.15MB 7.217784 1.753814
9 LEGACY SERDE_JSON ZSTD 54.7MB 8.413706 4.899252
10 IPC SERDE_JSON ZSTD 71.3MB 71.410487 16.191857
11 IPC CIBORIUM ZSTD 43.46MB 0.644313 failed
12 IPC CIBORIUM NONE 214.01MB 0.399962 failed

Photo
image

Results CSV

bench_serialize_processed_results.csv

Benchmark code
import polars as pl
import os
import io
from itertools import product
from time import perf_counter

FILES_ROOT = ".env/data"
print(f"{FILES_ROOT = }")

out_path = ".env/bench_serialize_results.csv"


class Matrix:
    FILES = ["iris.csv", "yellow_tripdata_2015-01_head1M.csv"]
    POLARS_SERIALIZE_COMPRESSION = ["ZSTD", "NONE"]
    POLARS_COLUMN_SERIALIZER = ["LEGACY", "IPC"]
    POLARS_SERIALIZER = ["BINCODE", "CIBORIUM", "SERDE_JSON"]


def get_serialize_stats(value):
    r = {}
    t = perf_counter()
    b = getattr(value, "serialize")()
    r["serialize_time"] = perf_counter() - t
    r["serialized_size"] = len(b)
    r["serialized_prefix"] = repr(b)

    # Deserialization may fail
    r["deserialize_time"] = None

    try:
        t = perf_counter()
        getattr(value, "deserialize")(io.BytesIO(b))
        r["deserialize_time"] = perf_counter() - t
    except Exception:
        pass

    return r


results = []

combinations = [
    *product(
        Matrix.POLARS_SERIALIZE_COMPRESSION,
        Matrix.POLARS_COLUMN_SERIALIZER,
        Matrix.POLARS_SERIALIZER,
    )
]


def iter_targets():
    for fname in Matrix.FILES:
        path = f"{FILES_ROOT}/{fname}"
        df = pl.read_csv(path)

        yield fname, df

    yield (
        "q1_plan",
        (
            pl.LazyFrame(
                schema={
                    "l_orderkey": pl.Int64,
                    "l_partkey": pl.Int64,
                    "l_suppkey": pl.Int64,
                    "l_linenumber": pl.Int64,
                    "l_quantity": pl.Int64,
                    "l_extendedprice": pl.Float64,
                    "l_discount": pl.Float64,
                    "l_tax": pl.Float64,
                    "l_returnflag": pl.String,
                    "l_linestatus": pl.String,
                    "l_shipdate": pl.String,
                    "l_commitdate": pl.String,
                    "l_receiptdate": pl.String,
                    "l_shipinstruct": pl.String,
                    "l_shipmode": pl.String,
                    "comments": pl.String,
                }
            )
            .filter(pl.col("l_shipdate") <= pl.mean("l_shipdate"))
            .group_by("l_returnflag", "l_linestatus")
            .agg(
                pl.sum("l_quantity").alias("sum_qty"),
                pl.sum("l_extendedprice").alias("sum_base_price"),
                (pl.col("l_extendedprice") * (1.0 - pl.col("l_discount")))
                .sum()
                .alias("sum_disc_price"),
                (
                    pl.col("l_extendedprice")
                    * (1.0 - pl.col("l_discount"))
                    * (1.0 + pl.col("l_tax"))
                )
                .sum()
                .alias("sum_charge"),
                pl.mean("l_quantity").alias("avg_qty"),
                pl.mean("l_extendedprice").alias("avg_price"),
                pl.mean("l_discount").alias("avg_disc"),
                pl.len().alias("count_order"),
            )
            .sort("l_returnflag", "l_linestatus")
        ),
    )


for fname, value in iter_targets():
    raw_ipc_size = None

    if isinstance(value, pl.DataFrame):
        f = io.BytesIO()
        value.write_ipc(f)
        raw_ipc_size = f.tell()
        del f

    for compression, col_serializer, serializer in combinations:
        print(fname, col_serializer, serializer, compression)

        r = {
            "file": fname,
            "POLARS_COLUMN_SERIALIZER": col_serializer,
            "POLARS_SERIALIZER": serializer,
            "POLARS_SERIALIZE_COMPRESSION": compression,
        }

        for k, v in r.items():
            os.environ[k] = v

        r.update(get_serialize_stats(value))
        r["raw_ipc_size"] = raw_ipc_size
        results.append(r)


df = pl.from_records(results).with_columns(
    pl.format(
        "{}-{}",
        pl.col("serialized_prefix").str.slice(2, 10),
        pl.col("serialized_prefix").hash(),
    )
)

with pl.Config(
    fmt_str_lengths=99,
    tbl_cols=99,
    tbl_rows=99,
    tbl_formatting="ASCII_MARKDOWN",
    tbl_hide_column_data_types=True,
):
    print(df)


print(df.write_csv())

df.write_csv(out_path)

if (
    v := df.filter(pl.col("serialized_prefix").is_duplicated()).sort(
        "serialized_prefix", maintain_order=True
    )
).height > 0:
    print("WARN: Duplicated serialized values:")
    print(v)
else:
    print("OK")

Fixes #17211

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature rust Related to Rust Polars labels Dec 12, 2024
@nameexhaustion nameexhaustion changed the title feat(rust): Serialize DataFrame/Series using IPC in serde feat: Serialize DataFrame/Series using IPC in serde Dec 12, 2024
@github-actions github-actions bot added the python Related to Python Polars label Dec 12, 2024
struct SerializeWrap {
name: PlSmallStr,
/// Unit-length series for dispatching to IPC serialize
unit_series: Series,
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ScalarColumn I had the option of either using the serde::Serialize impl from AnyValue, or converting it to a unit-length Series and dispatch to IPC. I chose the IPC option as the AnyValue serde impl was missing quite a lot of dtypes, and using IPC would also give more assurance that the serialize behavior is the same with the SeriesColumn.


#[cfg(feature = "serde")]
#[test]
fn test_deserialize_height_validation_8751() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test was moved from Python - the existing test used an exact string representation of the previous JSON format, but that is changed after this PR.

@ritchie46
Copy link
Member

This will change the format of the serialized JSON values - I'm not entirely sure if this would be considered a breaking change there.

Our docs:

        Notes
        -----
        Serialization is not stable across Polars versions: a LazyFrame serialized
        in one Polars version may not be deserializable in another Polars version.

We don't guarantee serialized data format. :)

@nameexhaustion nameexhaustion force-pushed the serialize-ipc branch 3 times, most recently from 5466abf to a6ff973 Compare December 16, 2024 03:54
Copy link

codecov bot commented Dec 17, 2024

Codecov Report

Attention: Patch coverage is 90.52925% with 34 lines in your changes missing coverage. Please review.

Project coverage is 79.49%. Comparing base (2fc26cf) to head (8bb35ce).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
crates/polars-python/src/dataframe/serde.rs 33.33% 14 Missing ⚠️
crates/polars-core/src/serde/series.rs 87.05% 11 Missing ⚠️
crates/polars-utils/src/pl_serialize.rs 96.73% 3 Missing ⚠️
crates/polars-core/src/datatypes/_serde.rs 93.93% 2 Missing ⚠️
crates/polars-python/src/cloud.rs 0.00% 2 Missing ⚠️
crates/polars-core/src/frame/column/partitioned.rs 0.00% 1 Missing ⚠️
crates/polars-plan/src/plans/options.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #20266      +/-   ##
==========================================
- Coverage   79.60%   79.49%   -0.12%     
==========================================
  Files        1567     1569       +2     
  Lines      218528   218634     +106     
  Branches     2462     2462              
==========================================
- Hits       173969   173801     -168     
- Misses      43992    44265     +273     
- Partials      567      568       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@nameexhaustion nameexhaustion marked this pull request as draft December 17, 2024 09:27
@nameexhaustion nameexhaustion marked this pull request as ready for review December 17, 2024 09:28
@ritchie46 ritchie46 merged commit e03555c into pola-rs:main Dec 17, 2024
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Serializing float columns with format="json" turns inf/nan values into null
2 participants